/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

package software.amazon.awssdk.http.auth.aws.crt.internal.signer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertNull;
import static software.amazon.awssdk.checksums.DefaultChecksumAlgorithm.CRC32;
import static software.amazon.awssdk.checksums.DefaultChecksumAlgorithm.CRC32C;
import static software.amazon.awssdk.checksums.DefaultChecksumAlgorithm.CRC64NVME;
import static software.amazon.awssdk.checksums.DefaultChecksumAlgorithm.SHA1;
import static software.amazon.awssdk.checksums.DefaultChecksumAlgorithm.SHA256;
import static software.amazon.awssdk.crt.auth.signing.AwsSigningConfig.AwsSignatureType.HTTP_REQUEST_VIA_HEADERS;
import static software.amazon.awssdk.crt.auth.signing.AwsSigningConfig.AwsSignatureType.HTTP_REQUEST_VIA_QUERY_PARAMS;
import static software.amazon.awssdk.crt.auth.signing.AwsSigningConfig.AwsSignedBodyValue.STREAMING_AWS4_ECDSA_P256_SHA256_PAYLOAD;
import static software.amazon.awssdk.crt.auth.signing.AwsSigningConfig.AwsSignedBodyValue.STREAMING_AWS4_ECDSA_P256_SHA256_PAYLOAD_TRAILER;
import static software.amazon.awssdk.crt.auth.signing.AwsSigningConfig.AwsSignedBodyValue.STREAMING_UNSIGNED_PAYLOAD_TRAILER;
import static software.amazon.awssdk.crt.auth.signing.AwsSigningConfig.AwsSignedBodyValue.UNSIGNED_PAYLOAD;
import static software.amazon.awssdk.crt.auth.signing.AwsSigningConfig.AwsSigningAlgorithm.SIGV4_ASYMMETRIC;
import static software.amazon.awssdk.http.auth.aws.TestUtils.AnonymousCredentialsIdentity;
import static software.amazon.awssdk.http.auth.aws.crt.TestUtils.generateBasicAsyncRequest;
import static software.amazon.awssdk.http.auth.aws.crt.TestUtils.generateBasicRequest;
import static software.amazon.awssdk.http.auth.aws.crt.TestUtils.testPayload;
import static software.amazon.awssdk.http.auth.aws.crt.internal.util.CrtUtils.toCredentials;
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.ChecksumUtil.readAll;
import static software.amazon.awssdk.http.auth.aws.signer.AwsV4FamilyHttpSigner.CHECKSUM_ALGORITHM;
import static software.amazon.awssdk.http.auth.aws.signer.AwsV4FamilyHttpSigner.SERVICE_SIGNING_NAME;
import static software.amazon.awssdk.http.auth.aws.signer.AwsV4aHttpSigner.AUTH_LOCATION;
import static software.amazon.awssdk.http.auth.aws.signer.AwsV4aHttpSigner.AuthLocation;
import static software.amazon.awssdk.http.auth.aws.signer.AwsV4aHttpSigner.CHUNK_ENCODING_ENABLED;
import static software.amazon.awssdk.http.auth.aws.signer.AwsV4aHttpSigner.EXPIRATION_DURATION;
import static software.amazon.awssdk.http.auth.aws.signer.AwsV4aHttpSigner.PAYLOAD_SIGNING_ENABLED;
import static software.amazon.awssdk.http.auth.aws.signer.AwsV4aHttpSigner.REGION_SET;
import static software.amazon.awssdk.http.auth.spi.signer.HttpSigner.SIGNING_CLOCK;
import static software.amazon.awssdk.http.auth.spi.signer.SdkInternalHttpSignerProperty.CHECKSUM_STORE;

import io.reactivex.Flowable;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.reactivestreams.Publisher;
import software.amazon.awssdk.checksums.SdkChecksum;
import software.amazon.awssdk.checksums.spi.ChecksumAlgorithm;
import software.amazon.awssdk.crt.Log;
import software.amazon.awssdk.crt.auth.signing.AwsSigningConfig;
import software.amazon.awssdk.http.Header;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.auth.aws.TestUtils;
import software.amazon.awssdk.http.auth.aws.signer.RegionSet;
import software.amazon.awssdk.http.auth.spi.signer.AsyncSignRequest;
import software.amazon.awssdk.http.auth.spi.signer.AsyncSignedRequest;
import software.amazon.awssdk.http.auth.spi.signer.PayloadChecksumStore;
import software.amazon.awssdk.http.auth.spi.signer.SignRequest;
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.ImmutableMap;
import software.amazon.awssdk.utils.IoUtils;


/**
 * Functional tests for the Sigv4a signer. These tests call the CRT native signer code.
 */
public class DefaultAwsCrtV4aHttpSignerTest {
    static {
        // Execute this statement before constructing the SDK service client.
        Log.initLoggingToStdout(Log.LogLevel.Trace);
    }

    DefaultAwsCrtV4aHttpSigner signer = new DefaultAwsCrtV4aHttpSigner();

    private static final Map<ChecksumAlgorithm, String> ALGORITHM_TO_VALUE = ImmutableMap.<ChecksumAlgorithm, String>builder()
                                                                              .put(CRC32, "i9aeUg==")
                                                                              .put(CRC32C, "crUfeA==")
                                                                              .put(SHA1, "e1AsOh9IyGCa4hLN+2Od7jlnP14=")
                                                                              .put(SHA256,
                                                                                   "ZOyIygCyaOW6GjVnihtTFtIS9PNmskdyMlNKiuyjfzw=")
                                                                              .put(CRC64NVME, "OOJZ0D8xKts=")
                                                                              .build();

    private static final String PAYLOAD_SHA256_HEX = BinaryUtils.toHex(computeChecksum(SHA256, testPayload()));

    public static Stream<Map.Entry<ChecksumAlgorithm, String>> checksumAlgorithmToValueParams() {
        return ALGORITHM_TO_VALUE.entrySet().stream();
    }

    @Test
    void sign_withBasicRequest_shouldSignWithHeaders() {
        AwsCredentialsIdentity credentials =
            AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY");
        SignRequest<AwsCredentialsIdentity> request = generateBasicRequest(
            credentials,
            httpRequest -> httpRequest.port(443),
            signRequest -> {
            }
        );

        AwsSigningConfig expectedSigningConfig = new AwsSigningConfig();
        expectedSigningConfig.setCredentials(toCredentials(request.identity()));
        expectedSigningConfig.setService("demo");
        expectedSigningConfig.setRegion("aws-global");
        expectedSigningConfig.setAlgorithm(SIGV4_ASYMMETRIC);
        expectedSigningConfig.setTime(1596476903000L);
        expectedSigningConfig.setUseDoubleUriEncode(true);
        expectedSigningConfig.setShouldNormalizeUriPath(true);
        expectedSigningConfig.setSignatureType(HTTP_REQUEST_VIA_HEADERS);

        SignedRequest signedRequest = signer.sign(request);

        assertThat(signedRequest.request().firstMatchingHeader("Host")).hasValue("demo.us-east-1.amazonaws.com");
        assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Date")).hasValue("20200803T174823Z");
        assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Region-Set")).hasValue("aws-global");
        assertThat(signedRequest.request().firstMatchingHeader("Authorization")).isPresent();
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")).contains(PAYLOAD_SHA256_HEX);
    }

    @Test
    // NOTE: differs from sync version by requiring content-length
    void signAsync_withBasicRequest_shouldSignWithHeaders() {
        AwsCredentialsIdentity credentials =
            AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY");
        AsyncSignRequest<AwsCredentialsIdentity> request = generateBasicAsyncRequest(
            credentials,
            httpRequest -> httpRequest.port(443)
                                      .putHeader("content-length", Long.toString(testPayload().length)),
            signRequest -> {
            }
        );

        AwsSigningConfig expectedSigningConfig = new AwsSigningConfig();
        expectedSigningConfig.setCredentials(toCredentials(request.identity()));
        expectedSigningConfig.setService("demo");
        expectedSigningConfig.setRegion("aws-global");
        expectedSigningConfig.setAlgorithm(SIGV4_ASYMMETRIC);
        expectedSigningConfig.setTime(1596476903000L);
        expectedSigningConfig.setUseDoubleUriEncode(true);
        expectedSigningConfig.setShouldNormalizeUriPath(true);
        expectedSigningConfig.setSignatureType(HTTP_REQUEST_VIA_HEADERS);

        AsyncSignedRequest signedRequest = signer.signAsync(request).join();

        assertThat(signedRequest.request().firstMatchingHeader("Host")).hasValue("demo.us-east-1.amazonaws.com");
        assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Date")).hasValue("20200803T174823Z");
        assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Region-Set")).hasValue("aws-global");
        assertThat(signedRequest.request().firstMatchingHeader("Authorization")).isPresent();
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")).contains(PAYLOAD_SHA256_HEX);
    }

    @Test
    void signAsync_withBasicRequest_signWithHeaders_contentLengthNotPresent_throws() {
        AwsCredentialsIdentity credentials =
            AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY");
        AsyncSignRequest<AwsCredentialsIdentity> request = generateBasicAsyncRequest(
            credentials,
            httpRequest -> httpRequest.port(443),
            signRequest -> {
            }
        );

        AwsSigningConfig expectedSigningConfig = new AwsSigningConfig();
        expectedSigningConfig.setCredentials(toCredentials(request.identity()));
        expectedSigningConfig.setService("demo");
        expectedSigningConfig.setRegion("aws-global");
        expectedSigningConfig.setAlgorithm(SIGV4_ASYMMETRIC);
        expectedSigningConfig.setTime(1596476903000L);
        expectedSigningConfig.setUseDoubleUriEncode(true);
        expectedSigningConfig.setShouldNormalizeUriPath(true);
        expectedSigningConfig.setSignatureType(HTTP_REQUEST_VIA_HEADERS);

        assertThatThrownBy(signer.signAsync(request)::join)
            .hasCauseInstanceOf(UnsupportedOperationException.class)
            .hasMessageContaining("Content-Length");
    }

    @Test
    void sign_withQuery_shouldSignWithQueryParams() {
        AwsCredentialsIdentity credentials =
            AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY");
        SignRequest<AwsCredentialsIdentity> request = generateBasicRequest(
            credentials,
            httpRequest -> httpRequest.port(443),
            signRequest ->
                signRequest.putProperty(AUTH_LOCATION, AuthLocation.QUERY_STRING)
        );

        AwsSigningConfig expectedSigningConfig = new AwsSigningConfig();
        expectedSigningConfig.setCredentials(toCredentials(request.identity()));
        expectedSigningConfig.setService("demo");
        expectedSigningConfig.setRegion("aws-global");
        expectedSigningConfig.setAlgorithm(SIGV4_ASYMMETRIC);
        expectedSigningConfig.setTime(1596476903000L);
        expectedSigningConfig.setUseDoubleUriEncode(true);
        expectedSigningConfig.setShouldNormalizeUriPath(true);
        expectedSigningConfig.setSignatureType(HTTP_REQUEST_VIA_QUERY_PARAMS);

        SignedRequest signedRequest = signer.sign(request);

        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Algorithm"))
            .hasValue("AWS4-ECDSA-P256-SHA256");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Credential"))
            .hasValue("AKIDEXAMPLE/20200803/demo/aws4_request");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Date")).hasValue("20200803T174823Z");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-SignedHeaders"))
            .hasValue("host;x-amz-archive-description");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Region-Set")).hasValue("aws-global");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Signature")).isPresent();
    }

    @Test
    // NOTE: differs from sync version by requiring content-length
    void signAsync_withQuery_shouldSignWithQueryParams() {
        AwsCredentialsIdentity credentials =
            AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY");
        AsyncSignRequest<AwsCredentialsIdentity> request = generateBasicAsyncRequest(
            credentials,
            httpRequest -> httpRequest.port(443).putHeader("content-length", Long.toString(testPayload().length)),
            signRequest ->
                signRequest.putProperty(AUTH_LOCATION, AuthLocation.QUERY_STRING)
        );

        AwsSigningConfig expectedSigningConfig = new AwsSigningConfig();
        expectedSigningConfig.setCredentials(toCredentials(request.identity()));
        expectedSigningConfig.setService("demo");
        expectedSigningConfig.setRegion("aws-global");
        expectedSigningConfig.setAlgorithm(SIGV4_ASYMMETRIC);
        expectedSigningConfig.setTime(1596476903000L);
        expectedSigningConfig.setUseDoubleUriEncode(true);
        expectedSigningConfig.setShouldNormalizeUriPath(true);
        expectedSigningConfig.setSignatureType(HTTP_REQUEST_VIA_QUERY_PARAMS);

        AsyncSignedRequest signedRequest = signer.signAsync(request).join();

        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Algorithm"))
            .hasValue("AWS4-ECDSA-P256-SHA256");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Credential"))
            .hasValue("AKIDEXAMPLE/20200803/demo/aws4_request");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Date")).hasValue("20200803T174823Z");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-SignedHeaders"))
            .hasValue("content-length;host;x-amz-archive-description");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Region-Set")).hasValue("aws-global");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Signature")).isPresent();
    }

    @Test
    void sign_requestWithQueryEncodedParamValue_shouldEncodedValue() {
        AwsCredentialsIdentity credentials =
            AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY");
        SignRequest<AwsCredentialsIdentity> request =
            SignRequest.builder(credentials)
                       .request(SdkHttpRequest.builder()
                                              .method(SdkHttpMethod.POST)
                                              .port(443)
                                              .putHeader("x-amz-archive-description", "test  test")
                                              .putHeader("Host", "demo.us-east-1.amazonaws.com")
                                              .encodedPath("/")
                                              .uri(URI.create("https://demo.us-east-1.amazonaws.com"))
                                              .appendRawQueryParameter("goodParam1", "123")
                                              .appendRawQueryParameter("badParam", "abc&xyz")
                                              .appendRawQueryParameter("goodParam2", "abc")
                                              .build())
                       .payload(() -> new ByteArrayInputStream("{\"TableName\": \"foo\"}".getBytes()))
                       .putProperty(REGION_SET, RegionSet.create("aws-global"))
                       .putProperty(SERVICE_SIGNING_NAME, "demo")
                       .putProperty(SIGNING_CLOCK, new TestUtils.TickingClock(Instant.ofEpochMilli(1596476903000L)))
                       .putProperty(AUTH_LOCATION, AuthLocation.QUERY_STRING)
                       .build();

        SignedRequest signedRequest = signer.sign(request);
        Map<String, List<String>> queryParam = signedRequest.request().rawQueryParameters();
        assertThat(queryParam).doesNotContainKey("xyz");
        assertThat(queryParam).containsKeys("goodParam1", "badParam", "goodParam2");

        assertThat(signedRequest.request().encodedQueryParameters())
            .isPresent()
            .get()
            .matches(str -> str.contains("badParam=abc%26xyz"));

        assertThat(signedRequest.request().firstMatchingRawQueryParameter("goodParam1"))
            .hasValue("123");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("badParam"))
            .hasValue("abc&xyz");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("goodParam2"))
            .hasValue("abc");

        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Algorithm"))
            .hasValue("AWS4-ECDSA-P256-SHA256");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Credential"))
            .hasValue("AKIDEXAMPLE/20200803/demo/aws4_request");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Date")).hasValue("20200803T174823Z");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-SignedHeaders"))
            .hasValue("host;x-amz-archive-description");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Region-Set")).hasValue("aws-global");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Signature")).isPresent();
    }

    @Test
    // NOTE: differs from sync version by requiring content-length
    void signAsync_requestWithQueryEncodedParamValue_shouldEncodedValue() {
        byte[] content = "{\"TableName\": \"foo\"}".getBytes(StandardCharsets.UTF_8);

        AwsCredentialsIdentity credentials =
            AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY");
        AsyncSignRequest<AwsCredentialsIdentity> request =
            AsyncSignRequest.builder(credentials)
                       .request(SdkHttpRequest.builder()
                                              .method(SdkHttpMethod.POST)
                                              .port(443)
                                              .putHeader("x-amz-archive-description", "test  test")
                                              .putHeader("Host", "demo.us-east-1.amazonaws.com")
                                              .putHeader("content-length", Integer.toString(content.length))
                                              .encodedPath("/")
                                              .uri(URI.create("https://demo.us-east-1.amazonaws.com"))
                                              .appendRawQueryParameter("goodParam1", "123")
                                              .appendRawQueryParameter("badParam", "abc&xyz")
                                              .appendRawQueryParameter("goodParam2", "abc")
                                              .build())
                       .payload(Flowable.just(ByteBuffer.wrap(content)))
                       .putProperty(REGION_SET, RegionSet.create("aws-global"))
                       .putProperty(SERVICE_SIGNING_NAME, "demo")
                       .putProperty(SIGNING_CLOCK, new TestUtils.TickingClock(Instant.ofEpochMilli(1596476903000L)))
                       .putProperty(AUTH_LOCATION, AuthLocation.QUERY_STRING)
                       .build();

        AsyncSignedRequest signedRequest = signer.signAsync(request).join();
        Map<String, List<String>> queryParam = signedRequest.request().rawQueryParameters();
        assertThat(queryParam).doesNotContainKey("xyz");
        assertThat(queryParam).containsKeys("goodParam1", "badParam", "goodParam2");

        assertThat(signedRequest.request().encodedQueryParameters())
            .isPresent()
            .get()
            .matches(str -> str.contains("badParam=abc%26xyz"));

        assertThat(signedRequest.request().firstMatchingRawQueryParameter("goodParam1"))
            .hasValue("123");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("badParam"))
            .hasValue("abc&xyz");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("goodParam2"))
            .hasValue("abc");

        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Algorithm"))
            .hasValue("AWS4-ECDSA-P256-SHA256");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Credential"))
            .hasValue("AKIDEXAMPLE/20200803/demo/aws4_request");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Date")).hasValue("20200803T174823Z");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-SignedHeaders"))
            .hasValue("content-length;host;x-amz-archive-description");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Region-Set")).hasValue("aws-global");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Signature")).isPresent();
    }

    @Test
    void sign_withQueryAndExpiration_shouldSignWithQueryParamsAndExpire() {
        AwsCredentialsIdentity credentials =
            AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY");
        SignRequest<AwsCredentialsIdentity> request = generateBasicRequest(
            credentials,
            httpRequest -> httpRequest.port(443),
            signRequest -> signRequest
                .putProperty(AUTH_LOCATION, AuthLocation.QUERY_STRING)
                .putProperty(EXPIRATION_DURATION, Duration.ofSeconds(1))
        );

        AwsSigningConfig expectedSigningConfig = new AwsSigningConfig();
        expectedSigningConfig.setCredentials(toCredentials(request.identity()));
        expectedSigningConfig.setService("demo");
        expectedSigningConfig.setRegion("aws-global");
        expectedSigningConfig.setAlgorithm(SIGV4_ASYMMETRIC);
        expectedSigningConfig.setTime(1596476903000L);
        expectedSigningConfig.setUseDoubleUriEncode(true);
        expectedSigningConfig.setShouldNormalizeUriPath(true);
        expectedSigningConfig.setSignatureType(HTTP_REQUEST_VIA_QUERY_PARAMS);
        expectedSigningConfig.setExpirationInSeconds(1);

        SignedRequest signedRequest = signer.sign(request);

        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Algorithm"))
            .hasValue("AWS4-ECDSA-P256-SHA256");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Credential"))
            .hasValue("AKIDEXAMPLE/20200803/demo/aws4_request");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Date")).hasValue("20200803T174823Z");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-SignedHeaders"))
            .hasValue("host;x-amz-archive-description");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Region-Set")).hasValue("aws-global");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Signature")).isPresent();
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Expires")).hasValue("1");
    }

    @Test
    // NOTE: differs from sync version by requiring content-length
    void signAsync_withQueryAndExpiration_shouldSignWithQueryParamsAndExpire() {
        String contentLength = Long.toString(testPayload().length);
        AwsCredentialsIdentity credentials =
            AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY");
        AsyncSignRequest<AwsCredentialsIdentity> request = generateBasicAsyncRequest(
            credentials,
            httpRequest -> httpRequest.port(443)
                                      .putHeader("content-length", contentLength),
            signRequest -> signRequest
                .putProperty(AUTH_LOCATION, AuthLocation.QUERY_STRING)
                .putProperty(EXPIRATION_DURATION, Duration.ofSeconds(1))
        );

        AwsSigningConfig expectedSigningConfig = new AwsSigningConfig();
        expectedSigningConfig.setCredentials(toCredentials(request.identity()));
        expectedSigningConfig.setService("demo");
        expectedSigningConfig.setRegion("aws-global");
        expectedSigningConfig.setAlgorithm(SIGV4_ASYMMETRIC);
        expectedSigningConfig.setTime(1596476903000L);
        expectedSigningConfig.setUseDoubleUriEncode(true);
        expectedSigningConfig.setShouldNormalizeUriPath(true);
        expectedSigningConfig.setSignatureType(HTTP_REQUEST_VIA_QUERY_PARAMS);
        expectedSigningConfig.setExpirationInSeconds(1);

        AsyncSignedRequest signedRequest = signer.signAsync(request).join();

        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Algorithm"))
            .hasValue("AWS4-ECDSA-P256-SHA256");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Credential"))
            .hasValue("AKIDEXAMPLE/20200803/demo/aws4_request");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Date")).hasValue("20200803T174823Z");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-SignedHeaders"))
            .hasValue("content-length;host;x-amz-archive-description");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Region-Set")).hasValue("aws-global");
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Signature")).isPresent();
        assertThat(signedRequest.request().firstMatchingRawQueryParameter("X-Amz-Expires")).hasValue("1");
    }

    @Test
    void sign_withUnsignedPayload_shouldNotSignPayload() {
        AwsCredentialsIdentity credentials =
            AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY");
        SignRequest<AwsCredentialsIdentity> request = generateBasicRequest(
            credentials,
            httpRequest -> {
            },
            signRequest -> signRequest
                .putProperty(PAYLOAD_SIGNING_ENABLED, false)
        );

        AwsSigningConfig expectedSigningConfig = new AwsSigningConfig();
        expectedSigningConfig.setCredentials(toCredentials(request.identity()));
        expectedSigningConfig.setService("demo");
        expectedSigningConfig.setRegion("aws-global");
        expectedSigningConfig.setAlgorithm(SIGV4_ASYMMETRIC);
        expectedSigningConfig.setTime(1596476903000L);
        expectedSigningConfig.setUseDoubleUriEncode(true);
        expectedSigningConfig.setShouldNormalizeUriPath(true);
        expectedSigningConfig.setSignatureType(HTTP_REQUEST_VIA_HEADERS);
        expectedSigningConfig.setSignedBodyValue(UNSIGNED_PAYLOAD);

        SignedRequest signedRequest = signer.sign(request);

        assertThat(signedRequest.request().firstMatchingHeader("Host")).hasValue("demo.us-east-1.amazonaws.com");
        assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Date")).hasValue("20200803T174823Z");
        assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Region-Set")).hasValue("aws-global");
        assertThat(signedRequest.request().firstMatchingHeader("Authorization")).isPresent();
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")).contains(UNSIGNED_PAYLOAD);
    }

    @Test
    // NOTE: differs from sync version by requiring content-length
    void signAsync_withUnsignedPayload_shouldNotSignPayload() {
        String contentLength = Long.toString(testPayload().length);

        AwsCredentialsIdentity credentials =
            AwsCredentialsIdentity.create("AKIDEXAMPLE", "wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY");
        AsyncSignRequest<AwsCredentialsIdentity> request = generateBasicAsyncRequest(
            credentials,
            httpRequest -> {
                httpRequest.putHeader("content-length", contentLength);
            },
            signRequest -> signRequest
                .putProperty(PAYLOAD_SIGNING_ENABLED, false)
        );

        AwsSigningConfig expectedSigningConfig = new AwsSigningConfig();
        expectedSigningConfig.setCredentials(toCredentials(request.identity()));
        expectedSigningConfig.setService("demo");
        expectedSigningConfig.setRegion("aws-global");
        expectedSigningConfig.setAlgorithm(SIGV4_ASYMMETRIC);
        expectedSigningConfig.setTime(1596476903000L);
        expectedSigningConfig.setUseDoubleUriEncode(true);
        expectedSigningConfig.setShouldNormalizeUriPath(true);
        expectedSigningConfig.setSignatureType(HTTP_REQUEST_VIA_HEADERS);
        expectedSigningConfig.setSignedBodyValue(UNSIGNED_PAYLOAD);

        AsyncSignedRequest signedRequest = signer.signAsync(request).join();

        assertThat(signedRequest.request().firstMatchingHeader("Host")).hasValue("demo.us-east-1.amazonaws.com");
        assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Date")).hasValue("20200803T174823Z");
        assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Region-Set")).hasValue("aws-global");
        assertThat(signedRequest.request().firstMatchingHeader("Authorization")).isPresent();
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")).contains(UNSIGNED_PAYLOAD);
    }

    @Test
    void sign_withAnonymousCredentials_shouldNotSign() {
        AwsCredentialsIdentity credentials = new AnonymousCredentialsIdentity();
        SignRequest<? extends AwsCredentialsIdentity> request = generateBasicRequest(
            credentials,
            httpRequest -> {
            },
            signRequest -> {
            }
        );

        SignedRequest signedRequest = signer.sign(request);

        assertNull(signedRequest.request().headers().get("Authorization"));
    }

    @Test
    @Disabled // TODO: should this REALLY throw? we're not signing....
    void signAsync_withAnonymousCredentials_shouldNotSign() {
        AwsCredentialsIdentity credentials = new AnonymousCredentialsIdentity();
        AsyncSignRequest<? extends AwsCredentialsIdentity> request = generateBasicAsyncRequest(
            credentials,
            httpRequest -> {
            },
            signRequest -> {
            }
        );

        AsyncSignedRequest signedRequest = signer.signAsync(request).join();

        assertNull(signedRequest.request().headers().get("Authorization"));
    }

    @Test
    void sign_WithChunkEncodingTrue_DelegatesToAwsChunkedPayloadSigner() {
        SignRequest<? extends AwsCredentialsIdentity> request = generateBasicRequest(
            AwsCredentialsIdentity.create("access", "secret"),
            httpRequest -> httpRequest
                .putHeader(Header.CONTENT_LENGTH, "20"),
            signRequest -> signRequest
                .putProperty(CHUNK_ENCODING_ENABLED, true)
        );

        SignedRequest signedRequest = signer.sign(request);

        assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256"))
            .hasValue(STREAMING_AWS4_ECDSA_P256_SHA256_PAYLOAD);
        assertThat(signedRequest.request().firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue("353");
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).hasValue("20");

        // Ensures that CRT runs correctly and without throwing an exception
        readAll(signedRequest.payload().get().newStream());
    }

    @Test
    void signAsync_WithChunkEncodingTrue_DelegatesToAwsChunkedPayloadSigner() {
        String contentLength = Long.toString(testPayload().length);

        AsyncSignRequest<? extends AwsCredentialsIdentity> request = generateBasicAsyncRequest(
            AwsCredentialsIdentity.create("access", "secret"),
            httpRequest -> httpRequest
                .putHeader(Header.CONTENT_LENGTH, contentLength),
            signRequest -> signRequest
                .putProperty(CHUNK_ENCODING_ENABLED, true)
        );

        AsyncSignedRequest signedRequest = signer.signAsync(request).join();

        assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256"))
            .hasValue(STREAMING_AWS4_ECDSA_P256_SHA256_PAYLOAD);
        assertThat(signedRequest.request().firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue("343");
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).hasValue(contentLength);

        getAllItems(signedRequest.payload().get());
    }

    @Test
    void sign_WithChunkEncodingTrueAndChecksumAlgorithm_DelegatesToAwsChunkedPayloadSigner() throws IOException {
        String contentLength = Long.toString(testPayload().length);

        SignRequest<? extends AwsCredentialsIdentity> request = generateBasicRequest(
            AwsCredentialsIdentity.create("access", "secret"),
            httpRequest -> httpRequest
                .putHeader(Header.CONTENT_LENGTH, contentLength),
            signRequest -> signRequest
                .putProperty(CHUNK_ENCODING_ENABLED, true)
                .putProperty(CHECKSUM_ALGORITHM, CRC32)
        );

        SignedRequest signedRequest = signer.sign(request);

        assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256"))
            .hasValue(STREAMING_AWS4_ECDSA_P256_SHA256_PAYLOAD_TRAILER);
        assertThat(signedRequest.request().firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue("544");
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).hasValue(contentLength);
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-trailer")).hasValue("x-amz-checksum-crc32");

        byte[] content = IoUtils.toByteArray(signedRequest.payload().get().newStream());
        assertThat(content.length).isEqualTo(544);

        assertThat(asUtf8String(content)).contains(String.format("x-amz-checksum-crc32:%s\r\n", ALGORITHM_TO_VALUE.get(CRC32)));
    }

    @Test
    void signAsync_WithChunkEncodingTrueAndChecksumAlgorithm_DelegatesToAwsChunkedPayloadSigner() {
        String contentLength = Long.toString(testPayload().length);
        AsyncSignRequest<? extends AwsCredentialsIdentity> request = generateBasicAsyncRequest(
            AwsCredentialsIdentity.create("access", "secret"),
            httpRequest -> httpRequest
                .putHeader(Header.CONTENT_LENGTH, contentLength),
            signRequest -> signRequest
                .putProperty(CHUNK_ENCODING_ENABLED, true)
                .putProperty(CHECKSUM_ALGORITHM, CRC32)
        );

        AsyncSignedRequest signedRequest = signer.signAsync(request).join();

        assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256"))
            .hasValue(STREAMING_AWS4_ECDSA_P256_SHA256_PAYLOAD_TRAILER);
        assertThat(signedRequest.request().firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue("544");
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).hasValue(contentLength);
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-trailer")).hasValue("x-amz-checksum-crc32");

        List<ByteBuffer> allItems = getAllItems(signedRequest.payload().get());

        long length = allItems.stream().mapToLong(ByteBuffer::remaining).sum();
        assertThat(length).isEqualTo(544);

        assertThat(asUtf8String(allItems)).contains(String.format("x-amz-checksum-crc32:%s\r\n", ALGORITHM_TO_VALUE.get(CRC32)));
    }

    @Test
    void sign_WithPayloadSigningFalseAndChunkEncodingTrueAndTrailer_DelegatesToAwsChunkedPayloadSigner() throws IOException {
        String contentLength = Long.toString(testPayload().length);

        SignRequest<? extends AwsCredentialsIdentity> request = generateBasicRequest(
            AwsCredentialsIdentity.create("access", "secret"),
            httpRequest -> httpRequest
                .putHeader(Header.CONTENT_LENGTH, contentLength),
            signRequest -> signRequest
                .putProperty(PAYLOAD_SIGNING_ENABLED, false)
                .putProperty(CHUNK_ENCODING_ENABLED, true)
                .putProperty(CHECKSUM_ALGORITHM, CRC32)
        );

        SignedRequest signedRequest = signer.sign(request);

        assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256"))
            .hasValue(STREAMING_UNSIGNED_PAYLOAD_TRAILER);
        assertThat(signedRequest.request().firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue("52");
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).hasValue(contentLength);
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-trailer")).hasValue("x-amz-checksum-crc32");

        byte[] content = IoUtils.toByteArray(signedRequest.payload().get().newStream());
        assertThat(content.length).isEqualTo(52);

        assertThat(asUtf8String(content)).contains(String.format("x-amz-checksum-crc32:%s\r\n", ALGORITHM_TO_VALUE.get(CRC32)));
    }

    @Test
    void signAsync_WithPayloadSigningFalseAndChunkEncodingTrueAndTrailer_DelegatesToAwsChunkedPayloadSigner() {
        String contentLength = Long.toString(testPayload().length);

        AsyncSignRequest<? extends AwsCredentialsIdentity> request = generateBasicAsyncRequest(
            AwsCredentialsIdentity.create("access", "secret"),
            httpRequest -> httpRequest
                .putHeader(Header.CONTENT_LENGTH, contentLength),
            signRequest -> signRequest
                .putProperty(PAYLOAD_SIGNING_ENABLED, false)
                .putProperty(CHUNK_ENCODING_ENABLED, true)
                .putProperty(CHECKSUM_ALGORITHM, CRC32)
        );

        AsyncSignedRequest signedRequest = signer.signAsync(request).join();

        assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256"))
            .hasValue(STREAMING_UNSIGNED_PAYLOAD_TRAILER);
        assertThat(signedRequest.request().firstMatchingHeader(Header.CONTENT_LENGTH)).hasValue("52");
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).hasValue(contentLength);
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-trailer")).hasValue("x-amz-checksum-crc32");

        List<ByteBuffer> allItems = getAllItems(signedRequest.payload().get());
        long length = allItems.stream().mapToLong(ByteBuffer::remaining).sum();
        assertThat(length).isEqualTo(52);

        assertThat(asUtf8String(allItems)).contains(String.format("x-amz-checksum-crc32:%s\r\n", ALGORITHM_TO_VALUE.get(CRC32)));
    }

    @Test
    void sign_WithPayloadSigningFalseAndChunkEncodingTrueWithoutTrailer_DelegatesToUnsignedPayload() {
        SignRequest<? extends AwsCredentialsIdentity> request = generateBasicRequest(
            AwsCredentialsIdentity.create("access", "secret"),
            httpRequest -> httpRequest
                .putHeader(Header.CONTENT_LENGTH, "20"),
            signRequest -> signRequest
                .putProperty(PAYLOAD_SIGNING_ENABLED, false)
                .putProperty(CHUNK_ENCODING_ENABLED, true)
        );

        SignedRequest signedRequest = signer.sign(request);
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")).hasValue("UNSIGNED-PAYLOAD");
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).isNotPresent();
    }

    @Test
    void signAsync_WithPayloadSigningFalseAndChunkEncodingTrueWithoutTrailer_DelegatesToUnsignedPayload() {
        AsyncSignRequest<? extends AwsCredentialsIdentity> request = generateBasicAsyncRequest(
            AwsCredentialsIdentity.create("access", "secret"),
            httpRequest -> httpRequest
                .putHeader(Header.CONTENT_LENGTH, "20"),
            signRequest -> signRequest
                .putProperty(PAYLOAD_SIGNING_ENABLED, false)
                .putProperty(CHUNK_ENCODING_ENABLED, true)
        );

        AsyncSignedRequest signedRequest = signer.signAsync(request).join();
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-content-sha256")).hasValue("UNSIGNED-PAYLOAD");
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-decoded-content-length")).isNotPresent();
    }

    @ParameterizedTest
    @MethodSource("checksumAlgorithmToValueParams")
    void sign_checksumAlgorithmPresent_shouldAddChecksumHeader(Map.Entry<ChecksumAlgorithm, String> checksumToValue) {
        ChecksumAlgorithm checksumAlgorithm = checksumToValue.getKey();
        SignRequest<? extends AwsCredentialsIdentity> request = generateBasicRequest(
            AwsCredentialsIdentity.create("access", "secret"),
            httpRequest -> {
            },
            signRequest -> signRequest.putProperty(CHECKSUM_ALGORITHM, checksumAlgorithm)
        );

        SignedRequest signedRequest = signer.sign(request);
       assertThat(signedRequest.request().firstMatchingHeader("x-amz-checksum-" + checksumAlgorithm.algorithmId()
                                                                                                   .toLowerCase(Locale.US)))
           .contains(checksumToValue.getValue());
    }

    @Test
    void sign_checksumValueProvided_shouldNotOverrideChecksumHeader() {
        SignRequest<? extends AwsCredentialsIdentity> request = generateBasicRequest(
            AwsCredentialsIdentity.create("access", "secret"),
                httpRequest -> httpRequest
                    .putHeader("x-amz-checksum-crc32", "some value"),
            signRequest -> signRequest.putProperty(CHECKSUM_ALGORITHM, CRC32)
        );

        SignedRequest signedRequest = signer.sign(request);
        assertThat(signedRequest.request().firstMatchingHeader("x-amz-checksum-crc32"))
            .contains("some value");
    }

    @Test
    void sign_withProvidedHostHeader_shouldRespectUserHostHeader() {
        AwsCredentialsIdentity credentials =
            AwsCredentialsIdentity.create("access", "secret");

        String hostOverride = "virtual-host.localhost";
        SignRequest<AwsCredentialsIdentity> request = generateBasicRequest(
            credentials,
            httpRequest -> httpRequest.putHeader("Host", hostOverride).port(443),
            signRequest -> {

            }
        );

        SignedRequest signedRequest = signer.sign(request);

        assertThat(signedRequest.request().firstMatchingHeader("Host")).hasValue(hostOverride);
        assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Date")).hasValue("20200803T174823Z");
        assertThat(signedRequest.request().firstMatchingHeader("X-Amz-Region-Set")).hasValue("aws-global");
        assertThat(signedRequest.request().firstMatchingHeader("Authorization")).isPresent();
    }

    @Test
    void sign_WithPayloadSigningFalse_chunkEncodingTrue_cacheEmpty_storesComputedChecksum() throws IOException {
        PayloadChecksumStore cache = PayloadChecksumStore.create();

        SignRequest<? extends AwsCredentialsIdentity> request = generateBasicRequest(
            AwsCredentialsIdentity.create("access", "secret"),
            httpRequest -> httpRequest.uri(URI.create("http://demo.us-east-1.amazonaws.com")),
            signRequest -> signRequest
                .putProperty(PAYLOAD_SIGNING_ENABLED, false)
                .putProperty(CHUNK_ENCODING_ENABLED, true)
                .putProperty(CHECKSUM_ALGORITHM, CRC32)
                .putProperty(CHECKSUM_STORE, cache)
        );

        SignedRequest signedRequest = signer.sign(request);

        String requestPayload = IoUtils.toUtf8String(signedRequest.payload().get().newStream());

        byte[] payloadChecksum = computeChecksum(CRC32, testPayload());

        assertThat(cache.getChecksumValue(CRC32)).isEqualTo(payloadChecksum);
        assertThat(requestPayload).contains("x-amz-checksum-crc32:" + BinaryUtils.toBase64(payloadChecksum) + "\r\n");
    }

    @Test
    void sign_WithPayloadSigningFalse_chunkEncodingTrue_cacheContainsChecksum_usesCachedValue() throws IOException {
        PayloadChecksumStore cache = PayloadChecksumStore.create();

        byte[] checksumValue = "my-checksum".getBytes(StandardCharsets.UTF_8);
        cache.putChecksumValue(CRC32, checksumValue);

        SignRequest<? extends AwsCredentialsIdentity> request = generateBasicRequest(
            AwsCredentialsIdentity.create("access", "secret"),
            httpRequest -> httpRequest.uri(URI.create("http://demo.us-east-1.amazonaws.com")),
            signRequest -> signRequest
                .putProperty(PAYLOAD_SIGNING_ENABLED, false)
                .putProperty(CHUNK_ENCODING_ENABLED, true)
                .putProperty(CHECKSUM_ALGORITHM, CRC32)
                .putProperty(CHECKSUM_STORE, cache)
        );

        SignedRequest signedRequest = signer.sign(request);

        String requestPayload = IoUtils.toUtf8String(signedRequest.payload().get().newStream());

        assertThat(requestPayload).contains("x-amz-checksum-crc32:" + BinaryUtils.toBase64(checksumValue) + "\r\n");
    }

    @Test
    void sign_withPayloadSigningTrue_chunkEncodingFalse_withChecksum_cacheContainsCrc32AndSha256_usesCachedValues() {
        PayloadChecksumStore cache = PayloadChecksumStore.create();

        byte[] crc32Value = "my-crc32-checksum".getBytes(StandardCharsets.UTF_8);
        cache.putChecksumValue(CRC32, crc32Value);

        SignRequest<? extends AwsCredentialsIdentity> request = generateBasicRequest(
            AwsCredentialsIdentity.create("access", "secret"),
            httpRequest -> httpRequest.uri(URI.create("http://demo.us-east-1.amazonaws.com")),
            signRequest -> signRequest
                .putProperty(PAYLOAD_SIGNING_ENABLED, true)
                .putProperty(CHUNK_ENCODING_ENABLED, false)
                .putProperty(CHECKSUM_ALGORITHM, CRC32)
                .putProperty(CHECKSUM_STORE, cache)
        );

        SignedRequest signedRequest = signer.sign(request);

        SdkHttpRequest httpRequest = signedRequest.request();

        assertThat(httpRequest.firstMatchingHeader("x-amz-checksum-crc32")).hasValue(BinaryUtils.toBase64(crc32Value));
    }

    @Test
    void sign_withPayloadSigningTrue_chunkEncodingFalse_withChecksum_cacheEmpty_storesComputeChecksums() {
        PayloadChecksumStore cache = PayloadChecksumStore.create();

        SignRequest<? extends AwsCredentialsIdentity> request = generateBasicRequest(
            AwsCredentialsIdentity.create("access", "secret"),
            httpRequest -> httpRequest.uri(URI.create("http://demo.us-east-1.amazonaws.com")),
            signRequest -> signRequest
                .putProperty(PAYLOAD_SIGNING_ENABLED, true)
                .putProperty(CHUNK_ENCODING_ENABLED, false)
                .putProperty(CHECKSUM_ALGORITHM, CRC32)
                .putProperty(CHECKSUM_STORE, cache)
        );

        signer.sign(request);

        byte[] crc32Value = computeChecksum(CRC32, testPayload());
        byte[] sha256Value = computeChecksum(SHA256, testPayload());

        AssertionsForClassTypes.assertThat(cache.getChecksumValue(SHA256)).isEqualTo(sha256Value);
        AssertionsForClassTypes.assertThat(cache.getChecksumValue(CRC32)).isEqualTo(crc32Value);
    }

    private static String asUtf8String(byte[] bytes) {
        return new String(bytes, StandardCharsets.UTF_8);
    }

    private static String asUtf8String(List<ByteBuffer> buffers) {
        return buffers.stream().map(ByteBuffer::duplicate).map(StandardCharsets.UTF_8::decode).collect(Collectors.joining());
    }

    private static byte[] computeChecksum(ChecksumAlgorithm algorithm, byte[] data) {
        SdkChecksum checksum = SdkChecksum.forAlgorithm(algorithm);
        checksum.update(data, 0, data.length);
        return checksum.getChecksumBytes();
    }

    private static List<ByteBuffer> getAllItems(Publisher<ByteBuffer> publisher) {
        return Flowable.fromPublisher(publisher).toList().blockingGet();
    }

}
